package com.atguigu.flink.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/14
 */
public class Demo9_GetTimeWindowTimeAttr
{
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(2);

        env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .keyBy(WaterSensor::getId)

            .windowAll(
                TumblingProcessingTimeWindows.of(Time.seconds(5l))
            )
            .aggregate(new AggregateFunction<WaterSensor, Integer, String>()
                       {
                           //创建一个累加器对象,Task被创建时，只调用一次
                           @Override
                           public Integer createAccumulator() {
                               return 0;
                           }

                           //把输入的数据进行聚合。每来一条元素，调用一次
                           @Override
                           public Integer add(WaterSensor value, Integer accumulator) {
                               System.out.println("Demo7_Aggregate.add");
                               return accumulator + value.getVc();
                           }

                           //从ACC中获取最终的结果。窗口触发计算时，调用一次
                           @Override
                           public String getResult(Integer accumulator) {
                               return "sumVC:" + accumulator;
                           }

                           //无需实现，批处理才需要
                           @Override
                           public Integer merge(Integer a, Integer b) {
                               return null;
                           }
                       },
                /*
                    AllWindowFunction<V, R, W> windowFunction
                        V: 上面的AggregateFunction 最终聚合的结果。
                        R： 当前函数最终的输出。可以自定义
                        W： 窗口对象
                 */
                new AllWindowFunction<String, String, TimeWindow>()
                {
                    @Override
                    public void apply(TimeWindow window, Iterable<String> values, Collector<String> out) throws Exception {
                        System.out.println("Demo9_GetTimeWindowTimeAttr.apply");
                        String result = values.iterator().next();
                        out.collect(MyUtil.parseTimeWindow(window) + ":" + result);
                    }
                })
           /* .reduce(
                new ReduceFunction<WaterSensor>()
                {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("Demo9_GetTimeWindowTimeAttr.reduce");
                        value1.setVc(value1.getVc() + value2.getVc());
                        return value1;
                    }
                },
                *//*
                    AllWindowFunction<IN, OUT, W extends Window>
                            窗口关闭时，只调动一次
                        IN: 上面的ReduceFunction的输出
                        OUT： 当前函数最终的数据，可以自定义
                        W： 窗口对象
                            弥补reduce的缺陷
                 *//*
                new AllWindowFunction<WaterSensor, String, TimeWindow>()
                {
                    *//*
                        Iterable<WaterSensor> values: ReduceFunction聚合的最终结果。只有一条记录。
                     *//*
                    @Override
                    public void apply(TimeWindow window, Iterable<WaterSensor> values, Collector<String> out) throws Exception {
                        System.out.println("Demo9_GetTimeWindowTimeAttr.apply");
                        //ReduceFunction 聚合的最终结果
                        WaterSensor waterSensor = values.iterator().next();
                        out.collect(MyUtil.parseTimeWindow(window) +":" + waterSensor.getVc());
                    }
                }
            )*/
            /*
                ProcessAllWindowFunction<IN, OUT, W extends Window>
                    能获取到时间窗口的范围的前提是函数中携带了 W extends Window
             */
            /*.process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>()
            {
                @Override
                public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    //获取当前窗口的时间范围
                    TimeWindow window = context.window();
                    out.collect(MyUtil.parseTimeWindow(window)+":"+MyUtil.parseToList(elements));
                }
            })*/
            .print();

        env.execute();

    }
}
